package com.atuguigu.flink.Day02.window;

import com.atuguigu.flink.Day01.Singlesensor.SensorSource;
import com.atuguigu.flink.sensor.SendsorReading;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

// 使用全窗口聚合函数计算窗口平均温度
public class Example6 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env
                .addSource(new SensorSource())
                .filter(r->r.id.equals("sensor_1"))
                .keyBy(r->r.id)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .process(new WindowResult())
                .print();



        env.execute();

    }

    public static class WindowResult extends ProcessWindowFunction<SendsorReading,AvgPerWindow,String, TimeWindow> {

        @Override
        public void process(String s, Context context, Iterable<SendsorReading> iterable, Collector<AvgPerWindow> collector) throws Exception {
            Double sum=0.0;
            int count=0;
            for(SendsorReading e:iterable){
                sum+=e.temperture;
                count+=1;
            }
            Long windowStart=context.window().getStart();
            Long widowEnd=context.window().getEnd();
            double avg=sum/count;
            collector.collect(new AvgPerWindow(s,avg,windowStart,widowEnd));
        }
    }

    public static class AvgPerWindow{
        public String id;
        public Double avg;
        public Long windowStrat;
        public Long windowEnd;

        public AvgPerWindow() {
        }

        public AvgPerWindow(String id, Double avg, Long windowStrat, Long windowEnd) {
            this.id = id;
            this.avg = avg;
            this.windowStrat = windowStrat;
            this.windowEnd = windowEnd;
        }

        @Override
        public String toString() {
            return "AvgPerWindow{" +
                    "id='" + id + '\'' +
                    ", avg=" + avg +
                    ", windowStrat=" + windowStrat +
                    ", windowEnd=" + windowEnd +
                    '}';
        }
    }
}
